需求背景

最近在写商用云平台的虚拟充电桩设备,本来想着在之前虚拟 IPC 的基础上修改一些类型字段、加一些特定的适配协议和逻辑就可以了,后来发现还是有些不足的地方。

现有的虚拟设备是基于 Python locust 框架写的,消息上报实现方案是预定义好上报的周期,比如每分钟上报一次,在 locust 中预构建一个子任务,每分钟调用一次消息上报接口。这样的方法不足是,没法模拟真实设备的场景,随时触发任意类型的消息进行上报。另外,如果在运行过程中,需要手动修改某些虚拟设备的配置信息,也无法实现,只能停止后修改再重新运行。

换句话说,现有的虚拟设备工具,所有配置和逻辑都只能在运行前就预定义好,一旦运行之后就无法再介入修改了。

需要实现的优化目标是,在虚拟设备工具运行过程中,外部触发某个条件,能够直接影响内部正在运行的任务,这就涉及到了 Python 进程间的通信。

概念

进程是操作系统分配和调度系统资源(CPU、内存)的基本单位。进程之间是相互独立的,每启动一个新的进程相当于把数据进行了一次克隆,子进程里的数据修改无法影响到主进程中的数据,不同子进程之间的数据也不能直接共享,这是多进程在使用中与多线程最明显的区别。

常用的进程间通信方法有很多:

  1. 信号量( semaphore )
  2. 信号 ( signal )
  3. 管道( pipe )
  4. 消息队列( message queue )
  5. 共享内存( shared memory )
  6. 套接字( socket )
  7. 文件 ( file )

具体到 Python,以上提到的进程间通信方法也都有对应实现。

信号量(semaphore)

信号量是一个共享资源访问者的计数器,可以用来控制多个进程对共享资源的并发访问数。它常作为一种锁机制,防止指定数量的进程正在访问共享资源时,其他进程也访问该资源。每次有一个进程获得信号量时,计数器 -1,若计数器为 0 时,其他进程就停止访问信号量,一直阻塞直到其他进程释放信号量。

示例:

import os
import time
from multiprocessing import Process, Semaphore

def handle(sem):
    sem.acquire()
    print(f"{int(time.time())}, {os.getpid()} 开始处理事件")
    time.sleep(1)
    print(f"{int(time.time())}, {os.getpid()} 结束处理事件")
    sem.release()

if __name__ == '__main__':
    sem = Semaphore(4)
    for i in range(5):
        p = Process(target=handle, args=(sem,))
        p.start()

运行结果:

1653378426, 216 开始处理事件
1653378426, 3888 开始处理事件
1653378426, 9064 开始处理事件
1653378426, 13968 开始处理事件
1653378427, 216 结束处理事件
1653378427, 17160 开始处理事件
1653378427, 9064 结束处理事件
1653378427, 3888 结束处理事件
1653378427, 13968 结束处理事件
1653378428, 17160 结束处理事件

可以看到,同时最多只有 4 个进程处理事件,当一个进程的事件处理结束释放信号量后,第 5 个进程才能开始处理事件。

信号量常用于控制某共享资源的多进程并发访问者数量,并不适用于进程之间传输具体数据。

信号(sginal)

信号是一种比较复杂的通信方式,用于通知接收进程某个事件已经发生。

进程间信号最先出现于 UNIX 系统,每个信号都有自己的系统调用,后续修改为统一的 signal()kill() 调用。最初的进程间信号系统是异步的,而且没有队列的概念,即不同信号间很容易产生冲突,导致应用程序来不及处理前一个信号。POSIX 规范后来改进了这一设计,另外规定了实时信号,靠队列的方式避免了信号冲突的问题。

为了统一各系统下进程间信号与其整数的统一,POSIX 规范规定了 19 个信号及其对应整数与行为:

Signal     Value     Action   Comment
───────────────────────────────────
SIGHUP        1       Term    Hangup detected on controlling terminal or death of controlling process
SIGINT        2       Term    Interrupt from keyboard
SIGQUIT       3       Core    Quit from keyboard
SIGILL        4       Core    Illegal Instruction
SIGABRT       6       Core    Abort signal from abort(3)
SIGFPE        8       Core    Floating point exception
SIGKILL       9       Term    Kill signal
SIGSEGV      11       Core    Invalid memory reference
SIGPIPE      13       Term    Broken pipe: write to pipe with no readers
SIGALRM      14       Term    Timer signal from alarm(2)
SIGTERM      15       Term    Termination signal
SIGUSR1   30,10,16    Term    User-defined signal 1
SIGUSR2   31,12,17    Term    User-defined signal 2
SIGCHLD   20,17,18    Ign     Child stopped or terminated
SIGCONT   19,18,25    Cont    Continue if stopped
SIGSTOP   17,19,23    Stop    Stop process
SIGTSTP   18,20,24    Stop    Stop typed at terminal
SIGTTIN   21,21,26    Stop    Terminal input for background process
SIGTTOU   22,22,27    Stop    Terminal output for background process

在 Linux/UNIX 下,由于 SIGINT 与 SIGTSTP 信号较为常用,这两个信号可以分别使用 Ctrl+CCtrl+Z 快捷键触发,Windows 支持前者,但不支持后者。此外在 Linux/UNIX 下还有一个不常用的 Ctrl+\ 快捷键,用于发送 SIGQUIT 信号。

首先介绍一个简单的方式,即异常捕获。Python 脚本运行过程中按下中断键(如 Ctrl+C)会触发一个 KeyboardInterrupt 异常,我们只要在需要处理中断的代码段外使用 try...except... 将其包裹起来即可,如下:

try:
    # Some code
except KeyboardInterrupt:
    # Another code

使用上文异常捕获的方式存在若干不足。一方面,对于一个庞大的系统来说,可能在不同的执行阶段对于退出有不同的处理方式;另一方面,尽管使用 Ctrl+C 热键触发 SIGINT 中断是最常见的方式,但并非所有 SIGINT 信号都是通过热键触发,也并非所有信号都是 SIGINT。Python 为了实现信号的安装,引入了 signal 模块。下文以 SIGINTSIGTERM 为例,简述该模块的使用。

import signal

def bye(signum, frame):
    print("Bye bye")
    exit(0)

signal.signal(signal.SIGINT, bye)
signal.signal(signal.SIGTERM, bye)

while True:
    pass

当执行过程中按下 Ctrl+C 或在其他终端窗口中输入 kill -2 [pid](2 的含义见上表)时,可以看到 bye(signum, frame) 函数被调用,并成功退出。

与信号量类似,信号仅用于进程间传递特定信号,并不适用于数据传输。

管道(pipe)

管道常用来在两个进程间进行通信,两个进程分别位于管道的两端。

Python multiprocessing 模块的 Pipe 方法返回(conn1, conn2)代表一个管道的两个端。Pipe 方法有 duplex 参数,如果 duplex 参数为 True(默认值),那么这个参数是全双工模式,也就是说 conn1 和 conn2 均可收发。若 duplex 为 False,conn1 只负责接收消息,conn2 只负责发送消息。sendrecv 方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用 conn1.send 发送消息,conn1.recv 接收消息。如果没有消息可接收,recv 方法会一直阻塞。如果管道已经被关闭,那么 recv 方法会抛出 EOFError

示例:创建两个进程,一个子进程通过 Pipe 发送数据,一个子进程通过 Pipe 接收数据。

from multiprocessing import Pipe, Process
import os
import time

def proc_send(pipe, data):
    for d in data:
        print(f"{os.getpid()} send: {d}")
        pipe.send(d)
        time.sleep(1)

def proc_recv(pipe):
    while True:
        print(f"{os.getpid()} recv: {pipe.recv()}")
        time.sleep(1)

if __name__ == '__main__':
    pipe = Pipe()
    p1 = Process(target=proc_send, args=(pipe[0], [i for i in range(5)]))
    p2 = Process(target=proc_recv, args=(pipe[1],))
    p1.start()
    p2.start()

输出如下:

16132 send: 0
16596 recv: 0
16132 send: 1
16596 recv: 1
16132 send: 2
16596 recv: 2
16132 send: 3
16596 recv: 3
16132 send: 4
16596 recv: 4

消息队列(message queue)

消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。

Python Queue 是多进程安全的队列,可以使用 Queue 实现多进程之间的数据传递。

put 方法用以插入数据到队列中, put 方法还有两个可选参数: blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,该方法会阻塞 timeout 指定的时间,直到该队列有剩余的空间。如果超时,会抛出 Queue.full 异常。如果 blocked 为 False,但该 Queue 已满,会立即抛出 Queue.full 异常。

get 方法可以从队列读取并且删除一个元素。同样, get 方法有两个可选参数: blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,那么在等待时间内没有取到任何元素,会抛出 Queue.Empty 异常。如果 blocked 为 False,有两种情况存在,如果 Queue 有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出 Queue.Empty 异常。

示例:创建两个进程,一个子进程通过 Queue 发送数据,一个子进程通过 Queue 接收数据。

from multiprocessing import Queue, Process
import time
import os

def send(q, data):
    for d in data:
        print(f"{os.getpid()} send: {d}")
        q.put(d)
        time.sleep(1)

def recv(q):
    while True:
        if not q.empty():
            print(f"{os.getpid()} recv: {q.get()}")
            time.sleep(1)
        else:
            continue

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=send, args=(q, [i for i in range(5)]))
    p2 = Process(target=recv, args=(q,))
    p1.start()
    p2.start()

输出如下:

16564 send: 0
9984 recv: 0
16564 send: 1
9984 recv: 1
16564 send: 2
9984 recv: 2
16564 send: 3
9984 recv: 3
16564 send: 4
9984 recv: 4

管道和消息队列可用于进程间传输数据,但每次传递的数据大小受限,效率偏低。

共享内存(shared memory)

共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的进程间通信方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往与其他通信机制,如信号量,配合使用,来实现进程间的同步和通信。

Python 标准库中实现共享内存通信的工具有 mmap,但是该库只能用于基本类型,且需要预先分配存储空间,对于自定义类型的对象使用起来有诸多不便。

比较好用的第三方工具有 apache 开源的 pyarrow,不需要预先定义存储空间且任意可序列化的对象均可存入共享内存。但使用时需要注意:pyarrow 反序列化的对象为只读对象不可修改其值,想要修改对象可先通过对象 copy。

Python 3.8 及以上版本支持 multiprocessing.shared_memory,一般来说,进程被限制只能访问属于自己进程空间的内存,但是共享内存允许跨进程共享数据,从而避免通过进程间发送消息的形式传递数据。相比通过磁盘、套接字或者其他要求序列化、反序列化和复制数据的共享形式,直接通过内存共享数据拥有更出色性能。

shared_memory 的使用也非常简单,只需要定义一个 ShareableList,就可以在多个进程之间访问同一对象。

ShareableList 提供一个可修改的类 list 对象,其中所有值都存放在共享内存块中。这限制了可被存储在其中的值只能是 int, float, bool, str (每条数据小于 10M), bytes (每条数据小于 10M)以及 None 这些内置类型。它另一个显著区别于内置 list 类型的地方在于它的长度无法修改(比如,没有 append, insert 等操作)且不支持通过切片操作动态创建新的 ShareableList 实例。

a.py:

from multiprocessing import shared_memory

shared_list = shared_memory.ShareableList([None, 1, "haha", False, b'123'], name="test")
while True:
    pass

b.py:

from multiprocessing import shared_memory

data = shared_memory.ShareableList(name="test")
for d in data:
    print(d)

先运行 a.py,再运行 b.py,得到的运行结果为:

None
1
haha
False
b'123'

相比于管道和消息队列,共享内存具有更高的性能和更大的数据传输量。

套接字(socket)

socket 无疑是通信使用最为广泛的方式了,它不但能跨进程还能跨网络。在两个进程中创建 socket 连接,就能实现相互通信。

基本的 socket 就是基于以太网的套接字,也是最常用的。server 端先创建一个套接字,绑定一个本地端口,等待 client 连接;client 也创建一个套接字,直接连接到 server 端就可以正常进行通信了。不过,传输的数据必须为 byte 类型。

server.py:

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("localhost", 2333))
sock.listen()

c, addr = sock.accept()
print(f"Accept connect from {addr}")
while True:
    data = c.recv(1024).decode()
    if data:
        print(data)
        c.send(f"{data} Reply".encode())

client.py:

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("localhost", 2333))

sock.send(b"Data")
print(sock.recv(1024).decode())

先运行 server.py,再运行 client.py,server 端输出:

Accept connect from ('127.0.0.1', 52410)
Data

client 端输出:

Data Reply

Unix domain socket:

当同一个机器的多个进程使用普通套接字进行通信时,需要经过网络协议栈,这非常浪费,因为同一个机器根本没有必要走网络。所以 Unix 提供了一个套接字的特殊版本,它使用和套接字一摸一样的 api,但是地址不再是网络端口,而是文件。相当于我们通过某个特殊文件来进行套接字通信,不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序号和应答等,只是将应用层数据从一个进程拷贝到另一个进程。

具体实现上,只需要在创建套接字的时候指定创建方式为 socket.AF_UNIX 即可:

server_addr = "./tmp_sock"
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(server_addr)

不过 Unix 域套接字仅适用于 Unix 系统,在 Windows 系统下还是老实用以太网套接字绑定 localhost 吧。

文件(file)

使用文件进行通信是最简单的一种通信方式,一个进程将结果输出到临时文件,另一个进程从文件中读出来。

这个实现很简单,就不做示例了,不过感觉效率很低,频繁读写文件可能还存在同步和性能开销问题,不是很推荐使用。

总结

本文对 Python 中的各类进程间通信实现方案进行了介绍。

使用总结:

  • 仅进程同步不涉及数据传输,可以使用信号、信号量;
  • 若进程间需要传递少量数据,可以使用管道、消息队列;
  • 若进程间需要传递大量数据,最佳方式是使用共享内存,这样减少数据拷贝、传输的时间内存代价;
  • 跨主机的进程间通信(RPC)可以使用 socket 通信,但 socket 同样可用于相同主机不同进程间的通信。

对各类进程间通信方案,只是做了简单的试用,并未涉及实现原理、冲突处理、锁机制、性能表现等,将在实际开发使用过程中继续深入了解。

参考